package gu.simplemq;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkArgument;

import java.util.Collection;
import java.util.Collections;

import com.google.common.base.Function;
import com.google.common.base.Predicates;
import com.google.common.base.Throwables;
import com.google.common.collect.Collections2;
import gu.simplemq.json.BaseJsonEncoder;
import gu.simplemq.pool.BaseMQPool;
import gu.simplemq.pool.BaseMQPool.MQPoolException;

/**
 * {@link IProducer},{@link IPublisher} 实现基类
 * @param <C> 消息系统的CLIENT实例类型
 * @author guyadong
 *
 */
public abstract class BaseMQSender <C> {
	private final static BaseJsonEncoder encoder = BaseJsonEncoder.getEncoder();
	/**
	 * 将对象序列化为JSON字符串的{@link Function} 实例
	 */
	private final static Function<Object, String> jsonFun = new Function<Object,String>(){
		@Override
		public String apply(Object input) {
			return encoder.toJsonString(input);
		}};
	protected final BaseMQPool<C> pool;
	/**
	 * 将对象序列化为字符串的实例默序列化为JSON字符串
	 */
	private Function<Object, String> stringSerializer = jsonFun;
	public BaseMQSender(BaseMQPool<C> poolLazy) {
		super();
		this.pool = checkNotNull(poolLazy,"poolLazy is null");
	}

	/**
	 * 执行消息发送
	 * @param c
	 * @param channel 频道名
	 * @param messages 待发送的消息集合(不包含为{@code null}的元素)
	 * @throws Exception
	 */
	abstract protected void doSend(C c,String channel,Iterable<String> messages) throws Exception;

	protected <T> void doProduce(Channel<T> channel, Collection<T>c) {
		checkArgument(channel != null, "channel is null");
		Collection<String> jsons = asJsons(c);
		if(jsons.isEmpty()){
			return ;
		}
		C client = null;
		try{
			client = pool.apply();
			doSend(client, channel.name, jsons);
		} catch (MQPoolException e) {
			throw new MQConnectionException(e);
		}catch (Throwable e) {
			Throwables.throwIfInstanceOf(e, MQConnectionException.class);
			Throwables.throwIfInstanceOf(e, MQRuntimeException.class);
			throw new MQRuntimeException(e);
		} finally{
			if(client != null){
				pool.free();
			}
		}	
	}

	private <T> Collection<String> asJsons(Collection<T>c){
		if(null == c ) {
			return Collections.emptyList();
		}
		Collection<T> iterable = Collections2.filter(c,Predicates.notNull());
		Collection<String> jsons = Collections2.transform(iterable, stringSerializer);
		return jsons;
	}

	/**
	 * 指定发送数据的字符串序列化实例，默认序列化为JSON字符串,对当前实例有效
	 * @param stringSerializer 为{@code null}时恢复使用默认序列化实例{@link #jsonFun}
	 * @since 2.3.11
	 */
	protected void setStringSerializer(Function<Object, String> stringSerializer) {
		this.stringSerializer = null == stringSerializer ? jsonFun: stringSerializer;
	}

	@Override
	public String toString() {
		StringBuilder builder = new StringBuilder();
		builder.append(getClass().getSimpleName() + " [pool=");
		builder.append(pool);
		builder.append("]");
		return builder.toString();
	}

}
